Python exceptions.MissingSchema方法代码示例 您所在的位置:网站首页 requestsexceptionsMissingSchema Python exceptions.MissingSchema方法代码示例

Python exceptions.MissingSchema方法代码示例

2024-07-11 07:19| 来源: 网络整理| 查看: 265

本文整理汇总了Python中requests.exceptions.MissingSchema方法的典型用法代码示例。如果您正苦于以下问题:Python exceptions.MissingSchema方法的具体用法?Python exceptions.MissingSchema怎么用?Python exceptions.MissingSchema使用的例子?那么恭喜您, 这里精选的方法代码示例或许可以为您提供帮助。您也可以进一步了解该方法所在requests.exceptions的用法示例。


示例1: pushArtifact # 需要导入模块: from requests import exceptions [as 别名] # 或者: from requests.exceptions import MissingSchema [as 别名] def pushArtifact(artifactFile, user, token, file, url, force): """Pushes the given file to the url with the provided user/token auth""" # Error and exit if artifact already exists and we are not forcing an override try: if (not force) and (artifactory.ArtifactoryPath(url, auth=(user, token)).exists()): raise RuntimeError(ERROR_ALREADY_PUSHED) except MissingSchema: pass # Rename artifact, deploy the renamed artifact, and then rename it back to original name print("Deploying {file} to {url}".format(file=file, url=url)) path = artifactory.ArtifactoryPath(url, auth=(user, token)) shutil.copyfile(artifactFile, file) try: path.deploy_file(file) os.remove(file) except: os.remove(file) raise 开发者ID:carsdotcom,项目名称:skelebot,代码行数:22,代码来源 示例2: test_get_request_with_port # 需要导入模块: from requests import exceptions [as 别名] # 或者: from requests.exceptions import MissingSchema [as 别名] def test_get_request_with_port(self, mock_requests, request_mock, mock_session): from requests.exceptions import MissingSchema with mock.patch( 'airflow.hooks.base_hook.BaseHook.get_connection', side_effect=get_airflow_connection_with_port ): expected_url = '' for endpoint in ['some/endpoint', '/some/endpoint']: try: except MissingSchema: pass request_mock.assert_called_once_with( mock.ANY, expected_url, headers=mock.ANY, params=mock.ANY ) request_mock.reset_mock() 开发者ID:apache,项目名称:airflow,代码行数:25,代码来源 示例3: test_hook_with_method_in_lowercase # 需要导入模块: from requests import exceptions [as 别名] # 或者: from requests.exceptions import MissingSchema [as 别名] def test_hook_with_method_in_lowercase(self, mock_requests, request_mock): from requests.exceptions import MissingSchema, InvalidURL with mock.patch( 'airflow.hooks.base_hook.BaseHook.get_connection', side_effect=get_airflow_connection_with_port ): data = "test params" try:'v1/test', data=data) except (MissingSchema, InvalidURL): pass request_mock.assert_called_once_with( mock.ANY, mock.ANY, headers=mock.ANY, params=data ) 开发者ID:apache,项目名称:airflow,代码行数:19,代码来源 示例4: _validate_server_url # 需要导入模块: from requests import exceptions [as 别名] # 或者: from requests.exceptions import MissingSchema [as 别名] def _validate_server_url(self): """Validates self.server_url""" try: request = requests.head(self.server_url) if request.status_code >= 400: raise InvenioConnectorServerError( "Unexpected status code '%d' accessing URL: %s" % (request.status_code, self.server_url)) except (InvalidSchema, MissingSchema) as err: raise InvenioConnectorServerError( "Bad schema, expecting http:// or https://:\n %s" % (err,)) except ConnectionError as err: raise InvenioConnectorServerError( "Couldn't establish connection to '%s':\n %s" % (self.server_url, err)) except InvalidURL as err: raise InvenioConnectorServerError( "Invalid URL '%s':\n %s" % (self.server_url, err)) except RequestException as err: raise InvenioConnectorServerError( "Unknown error connecting to '%s':\n %s" % (self.server_url, err)) 开发者ID:indico,项目名称:indico-plugins,代码行数:25,代码来源 示例5: _load_url_or_file # 需要导入模块: from requests import exceptions [as 别名] # 或者: from requests.exceptions import MissingSchema [as 别名] def _load_url_or_file(self, url): import requests from requests.exceptions import MissingSchema try: response = requests.get(url) if response.ok: return response.text else: raise requests.HTTPError except (MissingSchema, requests.HTTPError): # Not a valid URL, is it a file? try: return open(url, mode='r') except IOError: raise ValueError('Invalid URL or file: {}'.format(url)) 开发者ID:pytroll,项目名称:satpy,代码行数:18,代码来源 示例6: md5_sum_from_url # 需要导入模块: from requests import exceptions [as 别名] # 或者: from requests.exceptions import MissingSchema [as 别名] def md5_sum_from_url(url): try: r = requests.get(url, stream=True) except InvalidSchema: return None except MissingSchema: return None chunk_counter = 0 hash_store = hashlib.md5() for chunk in r.iter_content(chunk_size=1024): if chunk: # filter out keep-alive new chunks hash_store.update(chunk) chunk_counter += 1 # It is not possible to send greater than 50 MB via Telegram if chunk_counter > 50 * 1024: return None return hash_store.hexdigest() 开发者ID:Fillll,项目名称:reddit2telegram,代码行数:19,代码来源 示例7: test_url_generated_by_http_conn_id # 需要导入模块: from requests import exceptions [as 别名] # 或者: from requests.exceptions import MissingSchema [as 别名] def test_url_generated_by_http_conn_id(self, mock_request, mock_session): hook = SlackWebhookHook(http_conn_id='slack-webhook-url') try: hook.execute() except MissingSchema: pass mock_request.assert_called_once_with( self.expected_method, self.expected_url, headers=mock.ANY, data=mock.ANY ) mock_request.reset_mock() 开发者ID:apache,项目名称:airflow,代码行数:15,代码来源 示例8: test_url_generated_by_endpoint # 需要导入模块: from requests import exceptions [as 别名] # 或者: from requests.exceptions import MissingSchema [as 别名] def test_url_generated_by_endpoint(self, mock_request, mock_session): hook = SlackWebhookHook(webhook_token=self.expected_url) try: hook.execute() except MissingSchema: pass mock_request.assert_called_once_with( self.expected_method, self.expected_url, headers=mock.ANY, data=mock.ANY ) mock_request.reset_mock() 开发者ID:apache,项目名称:airflow,代码行数:15,代码来源 示例9: test_url_generated_by_http_conn_id_and_endpoint # 需要导入模块: from requests import exceptions [as 别名] # 或者: from requests.exceptions import MissingSchema [as 别名] def test_url_generated_by_http_conn_id_and_endpoint(self, mock_request, mock_session): hook = SlackWebhookHook(http_conn_id='slack-webhook-host', webhook_token='B000/XXX') try: hook.execute() except MissingSchema: pass mock_request.assert_called_once_with( self.expected_method, self.expected_url, headers=mock.ANY, data=mock.ANY ) mock_request.reset_mock() 开发者ID:apache,项目名称:airflow,代码行数:16,代码来源 示例10: get_resp # 需要导入模块: from requests import exceptions [as 别名] # 或者: from requests.exceptions import MissingSchema [as 别名] def get_resp(url): """Get webpage response as an lxml.html.HtmlElement object.""" try: headers = {"User-Agent": random.choice(USER_AGENTS)} try: request = requests.get(url, headers=headers, proxies=get_proxies()) except MissingSchema: url = add_protocol(url) request = requests.get(url, headers=headers, proxies=get_proxies()) return lh.fromstring(request.text.encode("utf-8") if PY2 else request.text) except Exception: sys.stderr.write("Failed to retrieve {0}.\n".format(url)) raise 开发者ID:huntrar,项目名称:scrape,代码行数:15,代码来源 示例11: get_raw_resp # 需要导入模块: from requests import exceptions [as 别名] # 或者: from requests.exceptions import MissingSchema [as 别名] def get_raw_resp(url): """Get webpage response as a unicode string.""" try: headers = {"User-Agent": random.choice(USER_AGENTS)} try: request = requests.get(url, headers=headers, proxies=get_proxies()) except MissingSchema: url = add_protocol(url) request = requests.get(url, headers=headers, proxies=get_proxies()) return request.text.encode("utf-8") if PY2 else request.text except Exception: sys.stderr.write("Failed to retrieve {0} as str.\n".format(url)) raise 开发者ID:huntrar,项目名称:scrape,代码行数:15,代码来源 示例12: example # 需要导入模块: from requests import exceptions [as 别名] # 或者: from requests.exceptions import MissingSchema [as 别名] def example(host, user, password, token): """run the example.""" client = None try: if token: print('token login') client = MatrixClient(host, token=token, user_id=user) else: print('password login') client = MatrixClient(host) token = client.login_with_password(user, password) print('got token: %s' % token) except MatrixRequestError as e: print(e) if e.code == 403: print("Bad username or password") exit(2) elif e.code == 401: print("Bad username or token") exit(3) else: print("Verify server details.") exit(4) except MissingSchema as e: print(e) print("Bad formatting of URL.") exit(5) except InvalidSchema as e: print(e) print("Invalid URL schema") exit(6) print("is in rooms") for room_id, room in client.get_rooms().items(): print(room_id) 开发者ID:saadnpq,项目名称:matrixcli,代码行数:36,代码来源 示例13: main # 需要导入模块: from requests import exceptions [as 别名] # 或者: from requests.exceptions import MissingSchema [as 别名] def main(host, username, password, room_id_alias): client = MatrixClient(host) try: client.login_with_password(username, password) except MatrixRequestError as e: print(e) if e.code == 403: print("Bad username or password.") sys.exit(4) else: print("Check your sever details are correct.") sys.exit(2) except MissingSchema as e: print("Bad URL format.") print(e) sys.exit(3) try: room = client.join_room(room_id_alias) except MatrixRequestError as e: print(e) if e.code == 400: print("Room ID/Alias in the wrong format") sys.exit(11) else: print("Couldn't find room.") sys.exit(12) room.add_listener(on_message) client.start_listener_thread() while True: msg = samples_common.get_input() if msg == "/quit": break else: room.send_text(msg) 开发者ID:saadnpq,项目名称:matrixcli,代码行数:40,代码来源 示例14: next_run_time # 需要导入模块: from requests import exceptions [as 别名] # 或者: from requests.exceptions import MissingSchema [as 别名] def next_run_time(self): try: return get( f"{app.scheduler_address}/next_runtime/{}", timeout=0.01 ).json() except (ConnectionError, MissingSchema, ReadTimeout): return "Scheduler Unreachable" 开发者ID:eNMS-automation,项目名称:eNMS,代码行数:9,代码来源 示例15: time_before_next_run # 需要导入模块: from requests import exceptions [as 别名] # 或者: from requests.exceptions import MissingSchema [as 别名] def time_before_next_run(self): try: return get( f"{app.scheduler_address}/time_left/{}", timeout=0.01 ).json() except (ConnectionError, MissingSchema, ReadTimeout): return "Scheduler Unreachable" 开发者ID:eNMS-automation,项目名称:eNMS,代码行数:9,代码来源







      CopyRight 2018-2019 实验室设备网 版权所有